In [ ]:
import torch
import pandas as pd
import numpy as np
import torch.nn as nn


import  sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from tqdm import tqdm
In [ ]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
In [ ]:
dataset_path = "imbalanced-benchmarking-set/datasets/MagicTelescope.csv"

df = pd.read_csv(dataset_path)


df = df.drop(df.columns[0], axis=1)

# Y - "TARGET"
# X - all other columns

Y = df["TARGET"]
X = df.drop(columns=["TARGET"])



unique_Y = Y.unique()
map_Y = dict(zip(unique_Y, range(len(unique_Y))))
print(map_Y)

Y = Y.map(map_Y)

number_of_classes = len(unique_Y)
print("Number of classes: ", number_of_classes)

number_of_features = len(X.columns)
print("Number of features: ", number_of_features)

X_df = X.copy()
Y_df = Y.copy()

X = X.to_numpy()
Y = Y.to_numpy()

X_not_normalized = X.copy()
X = sklearn.preprocessing.normalize(X)

print("X shape: ", X.shape)
print("Y shape: ", Y.shape)

X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, test_size=0.33, random_state=42
)

print("X_train shape: ", X_train.shape)
print("X_test shape: ", X_test.shape)
print("Y_train shape: ", Y_train.shape)
print("Y_test shape: ", Y_test.shape)
{'g': 0, 'h': 1}
Number of classes:  2
Number of features:  10
X shape:  (19020, 10)
Y shape:  (19020,)
X_train shape:  (12743, 10)
X_test shape:  (6277, 10)
Y_train shape:  (12743,)
Y_test shape:  (6277,)
In [ ]:
# Logistic Regression
model_LR = sklearn.linear_model.LogisticRegression(
    penalty="l2", C=1.0, solver="liblinear", max_iter=1000
)

# Decision Tree
model_DT = sklearn.tree.DecisionTreeClassifier(
    criterion="gini", splitter="best", max_depth=None, min_samples_split=2
)

# Neural Network
internal_size = 128
model_NN_flat = nn.Sequential(
    nn.Linear(number_of_features,internal_size ),
    nn.LeakyReLU(),
    nn.Linear(internal_size, internal_size),
    nn.LeakyReLU(),
    nn.Linear(internal_size, number_of_classes),
    nn.Softmax(dim=1),
)



# Neural Network deep
internal_size_deep = 32
layers = 5

layers_list = []
layers_list.append(nn.Linear(number_of_features, internal_size_deep))
for i in range(layers):
    layers_list.append(nn.LeakyReLU())
    layers_list.append(nn.Linear(internal_size_deep, internal_size_deep))
layers_list.append(nn.LeakyReLU())
layers_list.append(nn.Linear(internal_size_deep, number_of_classes))
layers_list.append(nn.Softmax(dim=1))

model_NN_deep = nn.Sequential(*layers_list)

model_list_sklearn = [model_LR, model_DT]
model_names_sklearn = ["Logistic Regression", "Decision Tree"]
model_list_pytorch = [model_NN_flat, model_NN_deep]
model_names_pytorch = ["Neural Network flat", "Neural Network deep"]
In [ ]:
for model in model_list_sklearn:
    model.fit(X_train, Y_train)
    
for model in model_list_pytorch:
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    dataset = torch.utils.data.TensorDataset(
        torch.from_numpy(X_train).float(), torch.from_numpy(Y_train).long()
    )
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=128, shuffle=True)
    # allow for early stopping
    patience = 5
    patience_counter = 0
    best_loss = np.inf
    for epoch in tqdm(range(100)):
        epoch_loss = 0
        for batch in dataloader:
            optimizer.zero_grad()
            X_batch, Y_batch = batch
            output = model(X_batch)
            loss = criterion(output, Y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        epoch_loss /= len(dataloader)
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience_counter = 0
        else:
            patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping")
            break
    print("Best loss: ", best_loss)
 40%|████      | 40/100 [00:28<00:42,  1.43it/s]
Early stopping
Best loss:  0.4764955559372902
 28%|██▊       | 28/100 [00:25<01:05,  1.11it/s]
Early stopping
Best loss:  0.4907132241129875

In [ ]:
for model,model_name in zip(model_list_sklearn[:-1], model_names_sklearn[:-1]):
    Y_pred = model.predict(X_test)
    print("Model: ", model_name)
    print("Accuracy: ", accuracy_score(Y_test, Y_pred))
    print(classification_report(Y_test, Y_pred))
    print("Confusion matrix: ")
    print(confusion_matrix(Y_test, Y_pred))
    
dataset_test = torch.utils.data.TensorDataset(
    torch.from_numpy(X_test).float(), torch.from_numpy(Y_test).long()
)
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=128, shuffle=False)    

for model,model_name in zip(model_list_pytorch[:-1], model_names_pytorch[:-1]):
    model.eval()
    Y_pred = []
    
    for batch in dataloader_test:
        X_batch, Y_batch = batch
        output = model(X_batch)
        Y_pred.append(torch.argmax(output, dim=1).numpy())
    Y_pred = np.concatenate(Y_pred)
    print("Model: ", model_name)
    print("Accuracy: ", accuracy_score(Y_test, Y_pred))
    print(classification_report(Y_test, Y_pred))
    print("Confusion matrix: ")
    print(confusion_matrix(Y_test, Y_pred))
Model:  Logistic Regression
Accuracy:  0.7270989326111199
              precision    recall  f1-score   support

           0       0.73      0.92      0.81      4071
           1       0.71      0.38      0.49      2206

    accuracy                           0.73      6277
   macro avg       0.72      0.65      0.65      6277
weighted avg       0.72      0.73      0.70      6277

Confusion matrix: 
[[3728  343]
 [1370  836]]
Model:  Neural Network flat
Accuracy:  0.8247570495459614
              precision    recall  f1-score   support

           0       0.84      0.90      0.87      4071
           1       0.79      0.68      0.73      2206

    accuracy                           0.82      6277
   macro avg       0.81      0.79      0.80      6277
weighted avg       0.82      0.82      0.82      6277

Confusion matrix: 
[[3669  402]
 [ 698 1508]]
In [ ]:
import lime
import lime.lime_tabular

explainer = lime.lime_tabular.LimeTabularExplainer(
    X_train,
    feature_names=df.columns[:-1],
    class_names=unique_Y,
    discretize_continuous=False,
)
In [ ]:
# LIME explanations for LR model
for i in range(5):
    i = np.random.randint(0, len(X_test))

    exp = explainer.explain_instance(
        X_test[i],
        model_list_sklearn[0].predict_proba,
        num_features=5,
        top_labels=1,
        num_samples=1000,
    )

    exp.show_in_notebook(show_table=True, show_all=False)
In [ ]:
import random
sum_weights_map = {}
#samples = len(X_test)
samples = 1000
for i in tqdm(range(samples)):
    sample = random.randint(0, len(X_test))
    exp = explainer.explain_instance(
        X_test[sample],
        model_list_sklearn[0].predict_proba,
        num_features=10,
        top_labels=1,
        num_samples=1000,
    )
    for feature, weight in exp.as_map()[list(exp.as_map().keys())[0]]:
        if feature not in sum_weights_map:
            sum_weights_map[feature] = 0
        sum_weights_map[feature] += weight

for feature in sum_weights_map:
    sum_weights_map[feature] /= samples
print(sum_weights_map)
100%|██████████| 1000/1000 [00:03<00:00, 253.48it/s]
{0: -0.06994100820278096, 8: -0.06612607596945722, 6: 0.03435833168776913, 1: 0.028186806528907586, 5: 0.023216026236353424, 9: 0.017200376130859514, 2: 0.004839845771756387, 4: 8.092740820925331e-05, 3: 0.00024973159738149214, 7: -0.0002733769076092749}

In [ ]:
import matplotlib.pyplot as plt
# bar plot using labels and weights
weights_with_labels = list(zip(sum_weights_map.values(), df.columns[:-1]))
weights_with_labels.sort(key=lambda x: x[0])

plt.figure(figsize=(10, 10))
plt.barh([x[1] for x in weights_with_labels], [x[0] for x in weights_with_labels])
plt.xlabel("Weight")
plt.ylabel("Feature")
plt.title("Feature importance for LR model")
Out[ ]:
Text(0.5, 1.0, 'Feature importance for LR model')
In [ ]:
# LIME explanations for DNN model
explainer = lime.lime_tabular.LimeTabularExplainer(
    X_train,
    feature_names=df.columns[:-1],
    class_names=unique_Y,
    discretize_continuous=True,
)

for i in range(5):
    i = np.random.randint(0, len(X_test))

    exp = explainer.explain_instance(
        X_test[i],
        model_list_sklearn[0].predict_proba,
        num_features=5,
        top_labels=1,
        num_samples=1000,
    )

    exp.show_in_notebook(show_table=True, show_all=False)
In [ ]:
sum_weights_map = {}
#samples = len(X_test)
samples = 1000
for i in tqdm(range(samples)):
    exp = explainer.explain_instance(
        X_test[i],
        lambda x: model_list_pytorch[0](torch.from_numpy(x).type(torch.float32)).detach().numpy(),
        num_features=10,
        top_labels=1,
        num_samples=1000,
    )
    for feature, weight in exp.as_map()[list(exp.as_map().keys())[0]]:
        if feature not in sum_weights_map:
            sum_weights_map[feature] = 0
        sum_weights_map[feature] += weight

for feature in sum_weights_map:
    sum_weights_map[feature] /= samples
100%|██████████| 1000/1000 [00:12<00:00, 82.91it/s]
In [ ]:
# bar plot for DNN model
weights_with_labels = list(zip(sum_weights_map.values(), df.columns[:-1]))
weights_with_labels.sort(key=lambda x: x[0])

plt.figure(figsize=(10, 10))
plt.barh([x[1] for x in weights_with_labels], [x[0] for x in weights_with_labels])
plt.xlabel("Weight")
plt.ylabel("Feature")
plt.title("Feature importance for DNN model")
Out[ ]:
Text(0.5, 1.0, 'Feature importance for DNN model')
In [ ]:
# SHAP for linear model
import shap

shap.initjs()
pred = model_list_sklearn[0].predict(X_test)
explainer = shap.KernelExplainer(model_list_sklearn[0].predict_proba, shap.sample(X_test, 100))
shap_values = explainer.shap_values(shap.sample(X_test, 100))
shap.summary_plot(shap_values, X_test, feature_names=df.columns[:-1], class_names=unique_Y)
  0%|          | 0/100 [00:00<?, ?it/s]
In [ ]:
# SHAP for DNN model
pred = model_list_pytorch[0](torch.from_numpy(X_test).type(torch.float32)).detach().numpy()
explainer = shap.KernelExplainer(lambda x: model_list_pytorch[0](torch.from_numpy(x).type(torch.float32)).detach().numpy(), shap.sample(X_test, 100))
shap_values = explainer.shap_values(shap.sample(X_test, 100))
shap.summary_plot(shap_values, X_test, feature_names=df.columns[:-1], class_names=unique_Y)
  0%|          | 0/100 [00:00<?, ?it/s]